Library Imports
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from datetime import datetime
Template
spark = (
SparkSession.builder
.master("local")
.appName("Exploring Joins")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
Initial Datasets
pets = spark.createDataFrame(
[
(1, 1, datetime(2018, 1, 1, 1 ,1, 1), 'Bear', 5),
(2, 1, datetime(2015, 1, 1, 1 ,1, 1), 'Chewie', 10),
(3, 1, datetime(2015, 1, 1, 1 ,1, 1), 'Roger', 15),
], ['id', 'breed_id', 'birthday', 'nickname', 'age']
)
pets.toPandas()
id | breed_id | nickname | age | |
---|---|---|---|---|
0 | 1 | 1 | Bear | 5 |
1 | 2 | 1 | Chewie | 10 |
2 | 3 | 1 | Roger | 15 |
Scenario #1
No orderBy
specified for window
object.
window_1 = Window.partitionBy('breed_id')
df_1 = pets.withColumn('foo', (F.sum(F.col('age')).over(window_1)))
df_1.toPandas()
id | breed_id | nickname | age | foo | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | 5 | 30 |
1 | 2 | 1 | Chewie | 10 | 30 |
2 | 3 | 1 | Roger | 15 | 30 |
Scenario #2
orderBy
with no rowsBetween
specified for window
object.
window_2 = (
Window
.partitionBy('breed_id')
.orderBy(F.col('id'))
)
df_2 = pets.withColumn('foo', (F.sum(F.col('age')).over(window_2)))
df_2.toPandas()
id | breed_id | nickname | age | foo | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | 5 | 5 |
1 | 2 | 1 | Chewie | 10 | 15 |
2 | 3 | 1 | Roger | 15 | 30 |
Scenario #3
orderBy
with a rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
specified for window
object.
window_3 = (
Window
.partitionBy('breed_id')
.orderBy(F.col('id'))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
df_3 = pets.withColumn('foo', (F.sum(F.col('age')).over(window_3)))
df_3.toPandas()
id | breed_id | nickname | age | foo | |
---|---|---|---|---|---|
0 | 1 | 1 | Bear | 5 | 30 |
1 | 2 | 1 | Chewie | 10 | 30 |
2 | 3 | 1 | Roger | 15 | 30 |
Why is This?
df_1.explain()
== Physical Plan ==
Window [sum(age#3L) windowspecdefinition(breed_id#1L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS foo#9L], [breed_id#1L]
+- *(1) Sort [breed_id#1L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#1L, 200)
+- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2,age#3L]
df_2.explain()
== Physical Plan ==
Window [sum(age#3L) windowspecdefinition(breed_id#1L, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS foo#216L], [breed_id#1L], [id#0L ASC NULLS FIRST]
+- *(1) Sort [breed_id#1L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#1L, 200)
+- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2,age#3L]
df_3.explain()
== Physical Plan ==
Window [sum(age#3L) windowspecdefinition(breed_id#1L, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS foo#423L], [breed_id#1L], [id#0L ASC NULLS FIRST]
+- *(1) Sort [breed_id#1L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(breed_id#1L, 200)
+- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2,age#3L]
TL;DR
By looking at the Physical Plan, the default behaviour for Window.partitionBy('col_1').orderBy('col_2')
without a .rowsBetween()
is to do .rowsBetween(Window.unboundedPreceding, Window.currentRow)
.
Looking at the scala code we can see that this is indeed the default and intended behavior, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala#L36-L38.
* @note When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding,
* unboundedFollowing) is used by default. When ordering is defined, a growing window frame
* (rangeFrame, unboundedPreceding, currentRow) is used by default.
Problem: This will cause problems if you're care about all the rows in the partitions.